Is producer.flush() a must? 路 Issue #137 路 confluentinc/confluent

您所在的位置:网站首页 kafka callback delay Is producer.flush() a must? 路 Issue #137 路 confluentinc/confluent

Is producer.flush() a must? 路 Issue #137 路 confluentinc/confluent

2023-07-28 05:34| 来源: 网络整理| 查看: 265

i have a producer code using callback . but i see my message not calling the callback or throwing the error even if no producer. always it show success. but if kafka is up then producing the message. But I need to raise the error even if the 1st message delivery fails. I could able to get the second message but not the first. can someone please help to understand the behaviour. or how to handle the 1st message failure below is my snippet

import json from confluent_kafka import avro, KafkaError from confluent_kafka.avro import AvroProducer from confluent_kafka import Producer

logger = get_logger()

class SimpleProducer(config): .... def produce_with_callback(self, key=None, value=None, poll_timeout=305000): def delivery_callback(error, msg): if error: logger.error(f"Message failed delivery for {msg.value()} with {error}") raise KafkaError else: logger.debug(f"Message delivered to {msg.topic()},{msg.partition()}, {msg.offset()}")

produce(topic=self._topic_name, key=key, value=value, callback=lambda error, message: delivery_callback(error,message)) # Poll time set to higher than message.timeout.ms poll(poll_timeout)

configuration = {'linger.ms': 30, 'acks': 'All', 'request.timeout.ms': 10000, 'message.timeout.ms': 90000}

count = 1 jproducer = SimpleProducer( producer_id="producer_id",topic_name="test",broker_host="localhost", broker_port='9092', configurations=configuration) print(f" producing {count}-- {datetime.datetime.now()}") try: jproducer.produce_with_callback(key='1',value=json.dumps(count), poll_timeout=130)

except BufferError as e: print(f"Buffer error: {datetime.datetime.now()} \n {e}") except Exception as e: print(f"produce error: {datetime.datetime.now()} \n {e}") else: print(f"produced successfully {count}: {datetime.datetime.now()}") count+=1



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3